Solutions/CofenseTriage/Data Connectors/CofenseTriageDataConnector/NonCofenseBasedIndicatorCreatorToCofense/sentinel.py (727 lines of code) (raw):

"""This file contains implementation of fetching sentinel TI indicator and creating cofense indicator.""" import time import json import inspect from datetime import datetime, timedelta from ..SharedCode.consts import ( AZURE_SUBSCRIPTION_ID, AZURE_RESOURCE_GROUP, AZURE_WORKSPACE_NAME, LOGS_STARTS_WITH, CONNECTION_STRING, QUERY_SENTINEL_INDICATORS_URL, QUERY_SENTINEL_PAGESIZE, SENTINEL_TO_COFENSE, SENTINEL_429_SLEEP, SENTINEL_SOURCE_PREFIX, SENTINEL_DATETIME_FORMAT, NON_COFENSE_THROTTLE_LIMIT, ) from ..SharedCode.logger import applogger from ..SharedCode.cofense_exception import CofenseException from ..SharedCode.state_manager import StateManager from ..SharedCode.utils import ( auth_sentinel, make_rest_call, sentinel_to_cofense_threat_level_mapping, check_environment_var_exist, ) from .cofense import CofenseTriage from .sentinel_to_cofense_mapping import SentinelToCofenseMapping class MicrosoftSentinel: """This class contains methods to get threat intelligence indicator from Microsoft Sentinel.""" def __init__(self) -> None: """Initialize instance variable for class.""" __method_name = inspect.currentframe().f_code.co_name # To check the environment variable. check_environment_var_exist(SENTINEL_TO_COFENSE) applogger.info( "{}(method={}) : {} : " "Started execution of posting indicators into Cofense Triage from " "Microsoft Sentinel Threat Intelligence.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) self.bearer_token = auth_sentinel(SENTINEL_TO_COFENSE) self.cofense_object = CofenseTriage() self.sentinel_cofense_mapping_obj = SentinelToCofenseMapping() self.get_sentinel_checkpoint_data() self.get_sentinel_cofense_id_table() self.indicator_count = 0 self.page_size = 100 self.throttle_flag = False self.set_throttle_limit() def set_throttle_limit(self): """To add a limit in indicator creation in MS Defender in one execution.""" __method_name = inspect.currentframe().f_code.co_name try: throttle_input = NON_COFENSE_THROTTLE_LIMIT if throttle_input == "None": self.page_size = QUERY_SENTINEL_PAGESIZE self.throttle_flag = False elif ( throttle_input.isdigit() and int(throttle_input) >= 1 and int(throttle_input) <= 100 ): self.page_size = int(throttle_input) self.throttle_flag = True else: raise CofenseException( "Invalid non cofense throttle limit. Valid limit range is from 1 to 100 " "and None(for no throttling)" ) except Exception as error: applogger.warning( "{}(method={}) : {} : " "Error occurred while getting the throttle limit for non cofense indicators. " "Error : {}. Starting normal execution(without throttle limit).".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) self.page_size = QUERY_SENTINEL_PAGESIZE self.throttle_flag = False def get_sentinel_checkpoint_data(self): """To get the sentinel checkpoint data from state manager.""" __method_name = inspect.currentframe().f_code.co_name # Initializing state manager for sentinel and cofense indicator id table. self.sentinel_checkpoint_state = StateManager( connection_string=CONNECTION_STRING, file_path="sentinel_checkpoint", ) try: sentinel_checkpoint_data = self.sentinel_checkpoint_state.get( SENTINEL_TO_COFENSE ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred while getting sentinel checkpoint data from sentinel checkpoint state manager.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise error # If checkpoint data is not none, then fetch the checkpoint fields. if sentinel_checkpoint_data is not None: self.sentinel_checkpoint_json_data = json.loads(sentinel_checkpoint_data) self.new_execution_flag = self.sentinel_checkpoint_json_data.get( "new_execution_flag" ) self.current_checkpoint_indicator_date = ( self.sentinel_checkpoint_json_data.get( "current_checkpoint_indicator_date" ) ) self.next_checkpoint_indicator_date = ( self.sentinel_checkpoint_json_data.get("next_checkpoint_indicator_date") ) if self.new_execution_flag == "False": self.sentinel_skiptoken = self.sentinel_checkpoint_json_data.get( "last_execution_skip_token" ) applogger.debug( "{}(method={}) : {}: " "Last checkpoint skip token is : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, self.sentinel_skiptoken, ) ) else: self.sentinel_skiptoken = "" applogger.debug( "{}(method={}) : {} : " "Sentinel checkpoint state manager data fetch successfully.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) # Convert string date to datetime object. self.current_checkpoint_indicator_date = self.convert_datetime_format( self.sentinel_checkpoint_json_data.get( "current_checkpoint_indicator_date" ) ) applogger.info( "{}(method={}) : {} : " "Last checkpoint date is : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, self.current_checkpoint_indicator_date, ) ) else: # If checkpoint is none then initialize from starting. # If checkpoint file is not found then, it is first run. self.sentinel_checkpoint_json_data = {} self.sentinel_checkpoint_json_data["new_execution_flag"] = "True" self.sentinel_checkpoint_json_data["last_execution_skip_token"] = "" # In first run we need to fetch last 15 days of indicators from sentinel TI. self.sentinel_checkpoint_json_data["current_checkpoint_indicator_date"] = ( datetime.utcnow() - timedelta(days=15) ).isoformat() self.sentinel_checkpoint_json_data["next_checkpoint_indicator_date"] = "" try: self.sentinel_checkpoint_state.post( json.dumps(self.sentinel_checkpoint_json_data) ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred while posting checkpoint data to sentinel checkpoint state manager.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise error self.new_execution_flag = self.sentinel_checkpoint_json_data.get( "new_execution_flag" ) self.sentinel_skiptoken = self.sentinel_checkpoint_json_data.get( "last_execution_skip_token" ) self.next_checkpoint_indicator_date = ( self.sentinel_checkpoint_json_data.get("next_checkpoint_indicator_date") ) applogger.info( "{}(method={}) : {} : " "Sentinel checkpoint state manager data not found. Creating it.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) # Convert string date to datetime object. self.current_checkpoint_indicator_date = self.convert_datetime_format( self.sentinel_checkpoint_json_data.get( "current_checkpoint_indicator_date" ) ) applogger.info( "{}(method={}) : {} : " "Getting indicators data of last 15 days from : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, self.current_checkpoint_indicator_date, ) ) def convert_datetime_format(self, datetime_string): """To convert datetime string to datetime type for comparison of dates.""" __method_name = inspect.currentframe().f_code.co_name try: if datetime_string is None or datetime_string == "": return datetime_string else: # To do: Check with Z in datetime format. return datetime.strptime( (datetime_string).strip()[:26].rstrip("Z"), SENTINEL_DATETIME_FORMAT, ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Expecting datetime in %Y-%m-%dT%H:%M:%S.%f format, getting datetime in {} format.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error, datetime_string, ) ) raise error def convert_sentinel_datetime_format(self, indicator): """To convert sentinel indicator lastUpdatedTimeUtc format.""" __method_name = inspect.currentframe().f_code.co_name try: return self.convert_datetime_format( indicator.get("properties", {}).get("lastUpdatedTimeUtc") ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred in Sentinel Threat Indicator with name: {}.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error, indicator.get("name", ""), ) ) raise error def get_sentinel_cofense_id_table(self): """To get sentinel cofense id table from state manager.""" __method_name = inspect.currentframe().f_code.co_name # Initializing state manager for sentinel and cofense indicator id table. self.sentinel_cofense_id_table_state = StateManager( connection_string=CONNECTION_STRING, file_path="sentinel_cofense_id_table", ) try: sentinel_cofense_id_table_data = self.sentinel_cofense_id_table_state.get( SENTINEL_TO_COFENSE ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred while getting sentinel cofense mapping id table data " "from sentinel cofense id table state manager.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise error # To do: remove "is not None". if sentinel_cofense_id_table_data is not None: self.sentinel_cofense_id_table = json.loads(sentinel_cofense_id_table_data) applogger.info( "{}(method={}) : {} : " "Sentinel Cofense ID table successfully fetched.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) else: # if table data is none then initialize the id table. self.sentinel_cofense_id_table = {} applogger.info( "{}(method={}) : {} : " "Sentinel Cofense ID table not found. Creating it.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) def update_checkpoint(self, sentinel_indicator_nextlink): """To update the checkpoint in state manager.""" __method_name = inspect.currentframe().f_code.co_name if ( sentinel_indicator_nextlink is not None and sentinel_indicator_nextlink != "" ): # Checkpoint is managed by skipToken fetched in nextLink in sentinel response. self.sentinel_skiptoken = sentinel_indicator_nextlink.split("$skipToken=")[ 1 ] self.sentinel_checkpoint_json_data[ "last_execution_skip_token" ] = self.sentinel_skiptoken try: self.sentinel_checkpoint_state.post( json.dumps(self.sentinel_checkpoint_json_data) ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred while posting current checkpoint data to sentinel checkpoint state manager.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise error applogger.debug( "{}(method={}) : {} : " "Skip Token stored as checkpoint : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, self.sentinel_skiptoken, ) ) else: # If there is no further indicators from sentinel TI. self.complete_current_execution(__method_name) def complete_current_execution(self, __method_name): """To update the checkpoint fields in sentinel checkpoint state manager.""" self.sentinel_checkpoint_json_data["new_execution_flag"] = "True" self.sentinel_checkpoint_json_data["last_execution_skip_token"] = "" self.sentinel_checkpoint_json_data[ "current_checkpoint_indicator_date" ] = self.sentinel_checkpoint_json_data.get("next_checkpoint_indicator_date") self.sentinel_checkpoint_json_data["next_checkpoint_indicator_date"] = "" # Saving flag into the sentinel checkpoint state manager. try: self.sentinel_checkpoint_state.post( json.dumps(self.sentinel_checkpoint_json_data) ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred while posting current checkpoint data to sentinel checkpoint state manager.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise error # Updating the flag value. self.new_execution_flag = self.sentinel_checkpoint_json_data.get( "new_execution_flag" ) # Reset the checkpoint(skipToken) for next execution. self.sentinel_skiptoken = self.sentinel_checkpoint_json_data.get( "last_execution_skip_token" ) self.current_checkpoint_indicator_date = self.convert_datetime_format( self.sentinel_checkpoint_json_data.get("current_checkpoint_indicator_date") ) self.next_checkpoint_indicator_date = self.sentinel_checkpoint_json_data[ "next_checkpoint_indicator_date" ] applogger.debug( "{}(method={}) : {}: " "Completed posting and updating current execution indicators from Microsoft Sentinel " "into Cofense Triage. Starting Next execution.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) applogger.info( "{}(method={}) : {}: " "Checkpoint date stored : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, self.current_checkpoint_indicator_date, ) ) def get_indicators_from_sentinel(self): """To get indicators from Microsoft Sentinel threat intelligence.""" try: __method_name = inspect.currentframe().f_code.co_name applogger.info( "{}(method={}) : {} : " "Started fetching indicators from Microsoft Sentinel Threat Intelligence.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) retry_count_429 = 0 retry_count_401 = 0 while retry_count_429 <= 3 and retry_count_401 <= 1: query_indicator_url = QUERY_SENTINEL_INDICATORS_URL.format( subscriptionId=AZURE_SUBSCRIPTION_ID, resourceGroupName=AZURE_RESOURCE_GROUP, workspaceName=AZURE_WORKSPACE_NAME, ) headers = { "Content-Type": "application/json", "Authorization": "Bearer {}".format(self.bearer_token), } body = { "pageSize": self.page_size, "sortBy": [ {"itemKey": "lastUpdatedTimeUtc", "sortOrder": "descending"} ], "skipToken": self.sentinel_skiptoken, } get_indicator_response = make_rest_call( url=query_indicator_url, method="POST", azure_function_name=SENTINEL_TO_COFENSE, payload=json.dumps(body), headers=headers, ) # If response status code is 200 to 299. if ( get_indicator_response.status_code >= 200 and get_indicator_response.status_code <= 299 ): sentinel_indicator_json = json.loads(get_indicator_response.text) sentinel_indicator_json_list = sentinel_indicator_json.get( "value", [] ) # Posting indicators into cofense. post_indicators_return = self.post_indicators( sentinel_json_indicator_list=sentinel_indicator_json_list, cofense_object=self.cofense_object, ) applogger.info( "{}(method={}) : {} : " "Completed the execution of total {} indicators from Microsoft Sentinel" " to Cofense Triage.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, self.indicator_count, ) ) # If return is False, it means no more indicator to fetch. So exit the python file. if post_indicators_return is False: applogger.warning( "{}(method={}) : {}: url: {}, Status Code : {} : " "No more indicators to fetch from Microsoft Sentinel. Exiting the function.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, ) ) # Exit from function app. return True # Updating the checkpoint. if self.new_execution_flag == "False": sentinel_indicator_nextlink = sentinel_indicator_json.get( "nextLink", "" ) self.update_checkpoint(sentinel_indicator_nextlink) if self.throttle_flag: applogger.warning( "{}(method={}) : {}: url: {}, Status Code : {} : " "Throttle limit reached. Exiting the function.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, ) ) return True # response status code is 429. elif get_indicator_response.status_code == 429: retry_count_429 += 1 applogger.error( "{}(method={}) : {}: url: {}, Status Code : {} : " "Getting 429 from sentinel get indicators api call. Retrying again after {} seconds.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, SENTINEL_429_SLEEP, ) ) applogger.debug( "{}(method={}) : {}: url: {}, Status Code : {}, Response reason: {}, Response: {} : " "Getting 429 from sentinel get indicators api call. Retry count: {}.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, get_indicator_response.reason, get_indicator_response.text, retry_count_429, ) ) # sleep for 60 seconds. time.sleep(SENTINEL_429_SLEEP) # response is 401, access token is expired. elif get_indicator_response.status_code == 401: retry_count_401 = retry_count_401 + 1 applogger.error( "{}(method={}) : {} : url: {}, Status Code : {}: Error Reason: {} : " "Sentinel access token expired, generating new access token.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, get_indicator_response.reason, ) ) applogger.debug( "{}(method={}) : {} : url: {}, Status Code : {}, Error Reason: {}, Response: {} : Sentinel" " access token expired, generating new access token. Retry count: {}.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, get_indicator_response.reason, get_indicator_response.text, retry_count_401, ) ) self.bearer_token = auth_sentinel(SENTINEL_TO_COFENSE) # response status code is not 200 to 299, 429 and 401. else: applogger.error( "{}(method={}) : {} : url: {}, Status Code : {} : Error while fetching indicators" " from sentinel threat intelligence. Error Reason: {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, get_indicator_response.reason, ) ) applogger.debug( "{}(method={}) : {} : url: {}, Status Code : {}, Error Reason: {}, Response: {} :" " Error while fetching indicators from sentinel threat intelligence.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, query_indicator_url, get_indicator_response.status_code, get_indicator_response.reason, get_indicator_response.text, ) ) # raise the exception to exit the function app. raise CofenseException() # retry count exceeded. applogger.error( "{}(method={}) : {} : Max retries exceeded for fetching indicators from sentinel.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) # raising the exception to exit the function app. raise CofenseException() except CofenseException: raise CofenseException() def post_indicators(self, sentinel_json_indicator_list, cofense_object): """To post and update the indicators into Cofense Triage.""" try: __method_name = inspect.currentframe().f_code.co_name applogger.debug( "{}(method={}) : {} : " "Posting and updating indicators into Cofense Triage.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, ) ) # posting and updating indicators for indicator in sentinel_json_indicator_list: current_indicator_updated_date = self.convert_sentinel_datetime_format( indicator=indicator ) if self.new_execution_flag == "True": if ( current_indicator_updated_date <= self.current_checkpoint_indicator_date ): # exit the class. execution is completed. return False self.sentinel_checkpoint_json_data[ "next_checkpoint_indicator_date" ] = indicator.get("properties", {}).get("lastUpdatedTimeUtc") self.sentinel_checkpoint_json_data["new_execution_flag"] = "False" try: self.sentinel_checkpoint_state.post( json.dumps(self.sentinel_checkpoint_json_data) ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred while posting sentinel checkpoint data to " "sentinel checkpoint state manager.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error, ) ) raise error self.next_checkpoint_indicator_date = ( self.sentinel_checkpoint_json_data.get( "next_checkpoint_indicator_date" ) ) applogger.debug( "{}(method={}) : {}: " "Next execution checkpoint date stored : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, self.next_checkpoint_indicator_date, ) ) self.new_execution_flag = self.sentinel_checkpoint_json_data.get( "new_execution_flag" ) # Completed current execution. if ( current_indicator_updated_date <= self.current_checkpoint_indicator_date ): self.complete_current_execution(__method_name) return True # getting indicator source and mapping sentinel source to cofense. sentinel_indicator_source = indicator.get("properties", {}).get( "source", None ) indicator_source = ( self.sentinel_cofense_mapping_obj.get_indicator_source( sentinel_indicator_source ) ) # convert sentinel indicator type to cofense indicator type. indicator_threat_type = ( self.sentinel_cofense_mapping_obj.get_cofense_threat_type(indicator) ) # Convert confidence integer to Level of threat (Malicious, Suspicious, or Benign). confidence = indicator.get("properties", {}).get("confidence", "") threat_level = sentinel_to_cofense_threat_level_mapping( confidence=confidence, azure_function_name=SENTINEL_TO_COFENSE ) # get the threat value from the sentinel indicator data. sentinel_pattern = indicator.get("properties", {}).get("pattern", None) threat_value = self.sentinel_cofense_mapping_obj.get_threat_value( sentinel_pattern ) # if indicator source and sentinel indicator pattern type is not as need. # To do: remove "is not None". if ( indicator_source is not None and indicator_threat_type is not None and threat_level is not None and threat_value is not None ): # if the sentinel indicator name is connected with a cofense id then update, # the indicator data into cofense triage. sentinel_indicator_name = indicator.get("name", "") if self.sentinel_cofense_id_table.get(sentinel_indicator_name): # get the cofense id from the id table. cofense_id = self.sentinel_cofense_id_table.get( indicator.get("name", "") ) # Update the indicator. self.update_indicator( cofense_object=cofense_object, cofense_id=cofense_id, threat_level=threat_level, ) # or else create a new indicator into cofense triage and relate the sentinel and cofense id, # into id table. else: # Create indicator into cofense. self.create_indicator( cofense_object=cofense_object, indicator_threat_type=indicator_threat_type, indicator_source=indicator_source, threat_level=threat_level, threat_value=threat_value, sentinel_indicator_name=indicator.get("name"), ) # Update the total executed indicators count. self.indicator_count += 1 # Completed current execution. return True except CofenseException as error: applogger.error( "{}(method={}) : {} : Error occurred while posting indicator in cofense : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise CofenseException() def create_indicator( self, cofense_object, indicator_threat_type, indicator_source, threat_level, threat_value, sentinel_indicator_name, ): """To create indicators into cofense triage.""" try: __method_name = inspect.currentframe().f_code.co_name # Posting indicator into cofense triage. ( post_indicator_status_code, post_response_json, ) = cofense_object.post_indicators( threat_level=threat_level, threat_type=indicator_threat_type, threat_value=threat_value, threat_source=SENTINEL_SOURCE_PREFIX + indicator_source, sentinel_indicator_name=sentinel_indicator_name, ) # if status code is 200-299. if post_indicator_status_code >= 200 and post_indicator_status_code <= 299: # getting the cofense triage indicator id. cofense_id = post_response_json.get("data", {}).get("id") applogger.debug( "{}(method={}) : {} : " "Indicator successfully created into Cofense with id : {}.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, cofense_id ) ) # updating the sentinel name - cofense id into the id table. self.sentinel_cofense_id_table[sentinel_indicator_name] = cofense_id try: self.sentinel_cofense_id_table_state.post( json.dumps(self.sentinel_cofense_id_table) ) except Exception as error: applogger.error( "{}(method={}) : {} Error: {} : " "Error occurred while posting sentinel cofense mapping id table data" " to sentinel cofense id table state manager.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise error # nothing for 422 status code. except CofenseException as error: applogger.error( "{}(method={}) : {} : Error occurred while creating indicator in cofense : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise CofenseException() def update_indicator(self, cofense_object, cofense_id, threat_level): """To update the indicator threat level in cofense triage.""" try: __method_name = inspect.currentframe().f_code.co_name # Update the indicator data into cofense triage. __method_name = inspect.currentframe().f_code.co_name cofense_object.update_indicators( threat_level=threat_level, id=cofense_id, ) applogger.debug( "{}(method={}) : {} : " "Indicator successfully updated into Cofense with id : {}.".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, cofense_id ) ) except CofenseException as error: applogger.error( "{}(method={}) : {} : Error occurred while updating indicator in cofense : {}".format( LOGS_STARTS_WITH, __method_name, SENTINEL_TO_COFENSE, error ) ) raise CofenseException()